package service

import (
	"context"
	"fmt"
	"github.com/tx7do/kratos-transport/broker"
	v1 "kratos_kafka/api/helloworld/v1"
	"kratos_kafka/internal/utils/mq_kafka"
)

// 在这里往kafka中写入一些数据
func (s *GreeterService) SendMsgToKafka(ctx context.Context, in *v1.SendMsgToKafkaReq) (*v1.Empty, error) {
	err := s.Uc.SendMsgToKafka(ctx, in)
	if err != nil {
		return nil, err
	}
	return &v1.Empty{}, nil
}

// 从kafka中接收数据并处理
func (s *GreeterService) HandleReceiveArticleData(ctx context.Context, topic string, headers broker.Headers, msg *mq_kafka.ArticleData) error {

	fmt.Printf("Topic %s,  Msg:%v", topic, msg)

	// Notice 这里面可以做一些业务逻辑～
	return s.Uc.AutoHandleArticleData(ctx, msg)
}

func (s *GreeterService) HandleReceiveArticleData2(ctx context.Context, topic string, event broker.Event, headers broker.Headers, msg *mq_kafka.ArticleData) error {

	fmt.Printf("Topic %s,  Msg:%v", topic, msg)

	// Notice 这里面可以做一些业务逻辑～
	err := s.Uc.AutoHandleArticleData2(ctx, event, msg)
	if err != nil {
		return err
	}

	// Notice 业务逻辑执行成功了,手动ACK消息～
	errAck := event.Ack()
	if errAck != nil {
		fmt.Printf("手动Ack消息失败! err: %v \n", errAck)
	}
	return nil
}

// 指定topic往kafka发送消息
func (s *GreeterService) SendMsgToKafkaByTopicName(ctx context.Context, req *v1.SendMsgToKafkaByTopicNameReq) (*v1.Empty, error) {

	return s.Uc.SendMsgToKafkaByTopicName(ctx, req)
}
